-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage/sources: Implement 'CREATE TABLE .. FROM SOURCE' parsing and planning for Kafka sources #29383
Conversation
2d62c1f
to
6dada81
Compare
… .. FROM SOURCE statements
…reate_source and plan_create_table_from_source for applicable source-types
6dada81
to
a0ac3b6
Compare
|
||
if let Some(format) = &format { | ||
f.write_node(format); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw that the ast printing of FormatSpecifier
adds a leading space itself but I think the approach in the other types is that if a leading space is required then it's the outer's function responsibility. So I think there should be a f.write_str(" ")
here and then the formatter of format should immediately begin with FORMAT
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense - pushed a commit to address this
// during purification and populate the `columns` and `constraints` fields for the statement, | ||
// whereas other source-types (e.g. kafka, single-output load-gen sources) do not, so instead | ||
// we use the source connection's default schema. | ||
let (key_desc, value_desc) = if !columns.is_empty() || !constraints.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this more? It's certainly possible that a pg source is requested to ingest an upstream table with no columns so this code path can trigger for those types too. My main question is why do we not fix this divergence at purification time so that we deal with uniform data here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point on the pg source with empty columns. I'll make a fix for that.
The goal of this conditional was to accommodate both styles of how we've implemented purification/planning for our various source types to determine the output relations to pass to the encoding/envelope stage, without completely undertaking a refactor of where all these things happened (between purification and planning).
The first style is what we do for mysql/postgres and multi-output load-gen sources, which uses purification to retrieve some external schema and turn that into the appropriate column-types during purification. In those cases the columns/constraints are also stored on the purified statement in the catalog, such that here we just need to turn them into a RelationDesc.
The second style is what I found we did for kafka and single-output load-gen sources, which is not to do much work in purification at all, besides retrieving any schema-registry schemas and storing those in the purified statement. You'll see that the purification work we do for kafka 'tables' is really minimal, all we do is check that the external-reference is either not specified or is the topic and insert it if needed. This is why the kafka source exports can just use the default
key / value desc, since those are standard across all 'outputs' and will be turned into the appropriate RelationDesc in the apply_source_envelope_encoding
method below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this should work fine for 'empty' postgres tables since the default_value_desc
for postgres is an empty RelationDesc
/// Defines the key schema to use by default for this source connection type. | ||
/// This will be used for the primary export of the source and as the default | ||
/// pre-encoding key schema for the source. | ||
fn default_key_desc(&self) -> RelationDesc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a method that we will need after we migrate to the new system? The way I was envisioning that these methods would evolve was to instead return a map of ExternalReference -> RelationDesc
but it's unclear if this fits with the new way of things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I was envisioning that these methods would evolve was to instead return a map of ExternalReference -> RelationDesc but it's unclear if this fits with the new way of things
That's not been necessary and since this trait is just implemented on the top level source 'connection' it's really only used to determine details related to the primary export, if any, of a source.
The external-reference stuff is also resolved at purification time, on a per-export-basis, and not necessary beyond that. I actually just realized that the external_reference()
method on this trait is not used by anything so I will remove it since it's likely just confusing.
Is this a method that we will need after we migrate to the new system?
Yes and no -- when we've fully migrated we will no longer need the primary_export_details
method below since sources won't have a 'primary export'. And these default_value_desc
and default_key_desc
methods are essentially just returning static values that could be defined elsewhere, but seem to work well here out of convenience. They are only necessary for the source types that determine their output relations entirely based on the envelope/encoding options for each export.
@@ -1560,8 +1560,11 @@ pub struct CreateTableFromSourceStatement<T: AstInfo> { | |||
pub constraints: Vec<TableConstraint<T>>, | |||
pub if_not_exists: bool, | |||
pub source: T::ItemName, | |||
pub external_reference: UnresolvedItemName, | |||
pub external_reference: Option<UnresolvedItemName>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this PR implement the thing where if no external reference is provided and it's unambiguous it's accepted? I didn't see any test like that added but this change looks like we accept it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline - it only implements that for Kafka and single-output load generator sources which only ever expose one output to reference. Will submit a future PR to make this uniform across all source types, such that you can omit a reference on a MySQL or Postgres source that only has one upstream table to ingest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the file I feel I understood the least. Is it possible to write a short explanation of what changes happen here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - just added a bunch of comments inline to try and explain the various changes made
connection: &mut CreateSourceConnection<Aug>, | ||
options: &SourceFormatOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explaining these changes: This refactor to introduce SourceFormatOptions
was necessary since I needed to re-use this purify_source_format
method inside the purify_create_table_from_source
method which doesn't have a reference to the CreateSourceConnection<Aug>
object. The only thing that was used for was to know if it was a kafka source and if so, what the topic was, to be used for retrieving the appropriate schema-registry details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. A bit unfortunate that we have this coupling between determining the schema of the format and the kind of source being used. A way out would be to pass the external reference as the schema registry subject name which coincides with the topic name for kafka sources. Not something we have to do in this PR
// We only determine the table description for multi-output load generator sources here, | ||
// whereas single-output load generators will have their relation description | ||
// determined during statment planning as envelope and format options may affect their | ||
// schema. | ||
if let Some(desc) = desc { | ||
let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?; | ||
*columns = gen_columns; | ||
*constraints = gen_constraints; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a consequence of how load-generators work, where the generator.views()
method only returns a list of views for multi-output load generators, whereas single-output load-generators are assumed to determine their relations using the encoding/envelope options similar to kafka sources
let mut views = load_gen_connection.load_generator.views(); | ||
if views.is_empty() { | ||
// If there are no views then this load-generator just has a single output | ||
// and the external reference specified in the statement should be the same | ||
// as the load-generator's schema name or empty. | ||
match external_reference { | ||
None => {} | ||
Some(reference) => { | ||
if reference.0.len() != 1 | ||
|| load_gen_connection.load_generator.schema_name() | ||
!= reference.0.last().unwrap().as_str() | ||
{ | ||
Err(LoadGeneratorSourcePurificationError::WrongLoadGenerator( | ||
load_gen_connection.load_generator.schema_name().to_string(), | ||
reference.clone(), | ||
))?; | ||
} | ||
} | ||
} | ||
PurifiedSourceExport { | ||
external_reference: UnresolvedItemName(vec![Ident::new_unchecked( | ||
load_gen_connection.load_generator.schema_name(), | ||
)]), | ||
details: PurifiedExportDetails::LoadGenerator { | ||
table: None, | ||
output: LoadGeneratorOutput::Default, | ||
}, | ||
} | ||
} else { | ||
let external_reference = external_reference.as_ref().ok_or( | ||
LoadGeneratorSourcePurificationError::MultiOutputRequiresExternalReference, | ||
)?; | ||
// Resolve the desired view reference to a `schema_name.view_name` reference | ||
let resolver = SourceReferenceResolver::new( | ||
LOAD_GENERATOR_DATABASE_NAME, | ||
&views | ||
.iter() | ||
.map(|(view_name, _, _)| { | ||
(load_gen_connection.load_generator.schema_name(), *view_name) | ||
}) | ||
.collect_vec(), | ||
)?; | ||
let (qualified_reference, view_idx) = resolver.resolve(&external_reference.0, 2)?; | ||
|
||
let (_, table, output) = views.swap_remove(view_idx); | ||
PurifiedSourceExport { | ||
external_reference: qualified_reference, | ||
details: PurifiedExportDetails::LoadGenerator { | ||
table: Some(table), | ||
output, | ||
}, | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is the distinction between multi-output and single-output load-generators sources. Referencing my earlier comment in plan/statement/ddl.rs
, multi-output load-generators behave similarly to mysql/postgres sources in that they determine their full output relations here during purification, whereas single-output load-generators behave like kafka sources. I think we could do more work in the future to try to unify these semantics.
This also implements the slight difference in requirements for the external-reference specification -- if it's a single-output load-generator then the reference is optional, and if it's not then it's required
let requested_reference = match external_reference { | ||
// The requested external reference for this table which will be resolved below to a | ||
// fully-qualified / normalized reference for each source type. | ||
Some(reference) => ExternalReferenceExport { | ||
reference: reference.clone(), | ||
alias: None, | ||
}, | ||
None => sql_bail!( | ||
"An external reference is required for {} sources", | ||
connection_name | ||
), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and the identical change in the MySQL case below are just to accommodate the fact that the external-reference is now optional on the CREATE TABLE FROM SOURCE
statement but required for these 2 source types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, this is introducing source specific behavior for how external references work which we should avoid. I think what we want to do is always take the list of available references from the connection and then try to see if we have an unambiguous match. There shouldn't be anything source specific in this process. This means that a pg source that exposes a single table should happily accept an empty reference.
It's hard to manually ensure that we don't accidentally introduce source-specific behavior so when the refactor dust starts to settle we should invest some time putting this behavior behind traits so that we are forced to create a model for sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we want to do is always take the list of available references from the connection and then try to see if we have an unambiguous match.
I've discussed this in previous PRs but this would require a non-trivial refactor of the purification logic for mysql and postgres sources that I'd really rather not bring into scope for this work due to the end-of-september deadline. We only retrieve the upstream information for the table that is explicitly requested in the external-reference for postgres and mysql sources right now. If we retrieve all upstream tables first and then down-select to the appropriate table in generic purification logic, we need to completely refactor the text-column and ignore-column handling in postgres::purify_source_exports
and mysql::purify_source_exports
to split up the logic on looking-up references, retrieving upstream schemas, and applying text-column/ignore-column references as well as all the error handling associated with that. It's definitely possible but a lot of extra work.
As it stands I've already isolated all code related to external-references to purification - dealing with external-references is now gone from all the downstream logic in planning and source rendering, which I count as a big win.
Separately as a design point - we should consider the latency and memory implications of adding a 'fetch all upstream schemas, tables, and their table-schemas' step to CREATE TABLE .. FROM SOURCE
purification just so we can down-select to a single table immediately afterwards behind the cleaner abstraction. It's probably fine in most cases, but might introduce new brittleness when dealing with very large upstream databases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only retrieve the upstream information for the table that is explicitly requested in the external-reference for postgres and mysql sources right now.
we should consider the latency and memory implications of adding a 'fetch all upstream schemas, tables, and their table-schemas' step to CREATE TABLE .. FROM SOURCE purification just so we can down-select to a single table immediately afterwards behind the cleaner abstraction.
That is a good point and the interface could definitely be the other way around. Specifically that each source implements a trait that includes a method like this:
async fn resolve_external_reference(&self, reference: Option<ExternalReference>) -> Self::Metadata;
The main point is to structure the code in such a way that source-specific behavior is hard to express. Having source specific behavior has bitten us countless times and is the reason why source planning is the rube goldberg machine that it is. So whenever I see it I'm worried that it might add yet another hitch to this gordian knot.
this work due to the end-of-september deadline
There should be some flexibility on that but I hear you. I know I'm repeating myself with these comments but it's only because I'm worried about future complications. We can proceed by carefully having these constraints in our minds but I would like to find the time and space to contain these source abstractions. It can happen at a later point but it will need our discipline to not deprioritize because there is a new thing with a new deadline that takes priority.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking a bit more about it, I think we'll have to (at least for now) fetch all upstream sources on CREATE TABLE FROM SOURCE
in order to support the empty reference, even if the interface is not refactor behind a clean trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking a bit more about it, I think we'll have to (at least for now) fetch all upstream sources on CREATE TABLE FROM SOURCE in order to support the empty reference, even if the interface is not refactor behind a clean trait.
If we want to submit the empty reference for postgres and mysql, yes we will. I plan to address this in a future PR, but based on our discussion here and in slack, I think your proposal for something similar to:
async fn resolve_external_reference(&self, reference: Option<ExternalReference>) -> Self::Metadata;
makes sense.
And in each source implementation, we can choose whether we need to retrieve 'all' upstream references to disambiguate the reference, or can just retrieve the singular upstream table info if the reference is already qualified. This means that we'll only pay the additional latency cost of having to retrieve all tables if the user chooses to provide a less qualified reference or omit the reference entirely.
let requested_reference = match external_reference { | ||
// The requested external reference for this table which will be resolved below to a | ||
// fully-qualified / normalized reference for each source type. | ||
Some(reference) => ExternalReferenceExport { | ||
reference: reference.clone(), | ||
alias: None, | ||
}, | ||
None => sql_bail!( | ||
"An external reference is required for {} sources", | ||
connection_name | ||
), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, this is introducing source specific behavior for how external references work which we should avoid. I think what we want to do is always take the list of available references from the connection and then try to see if we have an unambiguous match. There shouldn't be anything source specific in this process. This means that a pg source that exposes a single table should happily accept an empty reference.
It's hard to manually ensure that we don't accidentally introduce source-specific behavior so when the refactor dust starts to settle we should invest some time putting this behavior behind traits so that we are forced to create a model for sources.
mz_storage_types::sources::load_generator::LoadGenerator::KeyValue(_) | ||
) { | ||
sql_bail!( | ||
"{} is a {} source, which does not support CREATE TABLE .. FROM SOURCE.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is preventing this type of source from using CREATE TABLE .. FROM SOURCE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize that key-value load-generators use a completely separate rendering mechanism which I haven't yet refactored to support multiple source-exports:
materialize/src/storage/src/source/generator.rs
Lines 144 to 162 in f922274
match self { | |
GeneratorKind::Simple { | |
tick_micros, | |
as_of, | |
up_to, | |
generator, | |
} => render_simple_generator( | |
generator, | |
tick_micros, | |
as_of.into(), | |
up_to.into(), | |
scope, | |
config, | |
committed_uppers, | |
), | |
GeneratorKind::KeyValue(kv) => { | |
key_value::render(kv, scope, config, committed_uppers, start_signal) | |
} | |
} |
So I need to do that refactoring first before they can support added tables
|
||
let (_, desired_view_desc, output) = views.swap_remove(view_idx); | ||
let mut views = load_gen_connection.load_generator.views(); | ||
if views.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuing with my comment above, we should get each source to present its available references and have a common piece of source-agnostic code that selects a subset of them based on what the user typed.
connection: &mut CreateSourceConnection<Aug>, | ||
options: &SourceFormatOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. A bit unfortunate that we have this coupling between determining the schema of the format and the kind of source being used. A way out would be to pass the external reference as the schema registry subject name which coincides with the topic name for kafka sources. Not something we have to do in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Talked offline (i.e online in slack) about making room in the roadmap to try to make subsource handling uniform across all sources
Thanks - created the issue here to track that work https://github.com/MaterializeInc/database-issues/issues/8550 as part of the overall epic |
Motivation
Fixes https://github.com/MaterializeInc/database-issues/issues/8492
Fixes https://github.com/MaterializeInc/database-issues/issues/8493
Also enables
CREATE TABLE .. FROM SOURCE
statements for single-output load-gen sources (except key-value, which needs separate refactoring)Tips for reviewer
Likely easier to follow commit-by-commit
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.